package com.atguigu.app.func;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.TableProcess;
import com.atguigu.common.Constant;
import com.atguigu.utils.HBaseUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.hbase.client.Connection;

import java.io.IOException;

public class DimCreateTableRichMapFunction extends RichMapFunction<String, TableProcess> {

    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        //初始化连接
        connection = HBaseUtil.getConnection();
    }

    //value:{"before":null,"after":{"source_table":"base_category3","sink_type":"dim","sink_table":"dim_base_category3","sink_columns":"id,name,category2_id","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669162876406,"snapshot":"false","db":"gmall-220623-config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1669162876406,"transaction":null}
    @Override
    public TableProcess map(String value) throws Exception {

        //1.将数据转换为JSON对象
        JSONObject jsonObject = JSON.parseObject(value);

        //2.获取op字段，操作类型,并将数据转换为JavaBean对象
        String op = jsonObject.getString("op");
        TableProcess tableProcess;
        if ("d".equals(op)) {
            tableProcess = JSON.parseObject(jsonObject.getString("before"), TableProcess.class);
        } else {
            tableProcess = JSON.parseObject(jsonObject.getString("after"), TableProcess.class);
        }
        tableProcess.setOp(op);

        //3.根据操作类型,建表 or 删表
        String sinkType = tableProcess.getSinkType();
        if ("dim".equals(sinkType)) {
            String sinkTable = tableProcess.getSinkTable();
            if ("d".equals(op)) { //删除了配置信息
                HBaseUtil.dropTable(connection, Constant.HBASE_NAME_SPACE, sinkTable);
            } else if ("u".equals(op)) { //更新了配置信息
                HBaseUtil.dropTable(connection, Constant.HBASE_NAME_SPACE, sinkTable);
                createFlinkDimTable(connection, sinkTable, tableProcess.getSinkExtend(), tableProcess.getSinkFamily());
            } else {//第一次读取或者新增了配置信息
                createFlinkDimTable(connection, sinkTable, tableProcess.getSinkExtend(), tableProcess.getSinkFamily());
            }
            //4.返回结果数据
            return tableProcess;
        }

        return null;
    }

    private static void createFlinkDimTable(Connection connection, String tableName, String sinkExtend, String cf) {

        byte[][] splitKeys = null;

        if (sinkExtend != null) {
            //sinkExtend  ==>  01|,02|,03|...
            String[] split = sinkExtend.split(",");
            splitKeys = new byte[split.length][];
            for (int i = 0; i < split.length; i++) {
                splitKeys[i] = split[i].getBytes();
            }
        }

        //建表
        try {
            HBaseUtil.createTable(connection, Constant.HBASE_NAME_SPACE, tableName, splitKeys, cf);
        } catch (IOException e) {
            throw new RuntimeException("创建表：" + tableName + "失败！");
        }
    }

    @Override
    public void close() throws Exception {
        connection.close();
    }
}
